package com.totoro.zk.cfmc.register ;

import java.util.concurrent.atomic.AtomicReference ;

import org.apache.curator.RetryPolicy ;
import org.apache.curator.framework.CuratorFramework ;
import org.apache.curator.framework.CuratorFrameworkFactory ;
import org.apache.curator.framework.recipes.cache.NodeCache ;
import org.apache.curator.framework.recipes.cache.NodeCacheListener ;
import org.apache.curator.framework.state.ConnectionState ;
import org.apache.curator.framework.state.ConnectionStateListener ;
import org.apache.curator.retry.ExponentialBackoffRetry ;
import org.apache.curator.retry.RetryNTimes ;
import org.apache.curator.utils.CloseableUtils ;
import org.apache.zookeeper.CreateMode ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;

import com.totoro.zk.cfmc.ex.ClientException ;

/**
 * 使用zookeeper的curator来管理与zookeeper服务的连接
 * @author 80002165 @date 2017年6月6日 上午8:44:34 配置不同的客户端： 超时时间配置 重试策略的配置：4种策略
 *         重试策略参数配置，重试次数，重试间隔时间，重试开始到重试结束时间
 */
public class CuratorClient {
    Logger logger = LoggerFactory.getLogger(this.getClass()) ;
    /** 配置文件的根节点 */
    private static final String ROOT = "/cfmc" ;
    private static final int DEF_SESSION_TIMEOUT = 10000 ;
    private static final int DEF_CONNECT_TIMEOUT = 15000 ;
    
    private String connectString ;
    
    /** 连接zookeeper的客户单 */
    private CuratorFramework client ;
    
    private final static AtomicReference<CuratorFramework> cache = new AtomicReference<CuratorFramework>() ;
    
    private CuratorClient() {
    }
    
    private CuratorClient(String connectString) {
        this.connectString = connectString ;
    }
    
    public void start() {
        RetryPolicy retry = new RetryNTimes(1000, 3) ;
        client = CuratorFrameworkFactory.newClient(connectString, DEF_SESSION_TIMEOUT, DEF_CONNECT_TIMEOUT, retry) ;
        
        // client监听器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curator, ConnectionState state) {
                if (state == ConnectionState.LOST) {
                    logger.info("zookeeper客户端失去链接") ;
                }
                if (state == ConnectionState.CONNECTED) {
                    logger.info("zookeeper客户端链接成功") ;
                }
                if (state == ConnectionState.RECONNECTED) {
                    logger.info("zookeeper客户端重新链接成功") ;
                }
                if (state == ConnectionState.SUSPENDED) {
                    logger.info("zookeeper客户端暂停") ;
                }
            }
        }) ;
        
        client.start(); 
        
        // 创建根节点
//        try {
//            client.create().withMode(CreateMode.PERSISTENT).forPath(ROOT) ;
//        } catch (Exception e) {
//            logger.error(String.format("创建根节点:%s 失败！", ROOT), e) ;
//        }
    }
    
    private static CuratorFramework createSimple(String connectionString) {
        // these are reasonable arguments for the ExponentialBackoffRetry. The
        // first
        // retry will wait 1 second - the second will wait up to 2 seconds - the
        // third will wait up to 4 seconds.
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3) ;
        
        // The simplest way to get a CuratorFramework instance. This will use
        // default values.
        // The only required arguments are the connection string and the retry
        // policy
        return CuratorFrameworkFactory.newClient(connectionString, retryPolicy) ;
    }
    
    private static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
        // using the CuratorFrameworkFactory.builder() gives fine grained
        // control
        // over creation options. See the CuratorFrameworkFactory.Builder
        // javadoc
        // details
        return CuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs)
        // etc. etc.
                .build() ;
    }
    
    /**
     * 给根节点添加子节点监听
     * @author 80002165 @date 2017年6月6日 上午9:26:07
     */
    public void addRootNodeListener() {
        try {
            @SuppressWarnings("resource")
            NodeCache cache = new NodeCache(client, ROOT, false) ;
            cache.start(true) ;
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    logger.info(String.format("node %s data changed", cache.getCurrentData().getPath())) ;
                }
            }) ;
        } catch (Exception e) {
            e.printStackTrace() ;
        }
    }
    
    public void createNode(String path, byte[] data, boolean persistent) {
        try {
            CreateMode createMode = CreateMode.EPHEMERAL ;
            if (persistent) {
                createMode = CreateMode.PERSISTENT ;
            }
            client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path, data) ;
        } catch (Exception e) {
            e.printStackTrace() ;
        }
    }
    
    public void createNode(String path, boolean persistent) {
        createNode(path, null, persistent) ;
    }
    
    public void setNodeData(String path, byte[] data) {
        try {
            client.setData().forPath(path, data) ;
        } catch (Exception e) {
            e.printStackTrace() ;
        }
    }
    
    public byte[] getNodeData(String path) {
        try {
            return client.getData().forPath(path) ;
        } catch (Exception e) {
            String msg = String.format("获取路径：%s下的节点数据失败！", path) ;
            logger.error(msg, e);
            throw new ClientException(msg, e) ;
        }
    }
    
    public synchronized CuratorFramework getClient() {
        if (client != null) {
            return client ;
        } else {
            // 初始化client
            start() ;
            return client ;
        }
    }
    
    public synchronized void close() {
        if (client != null) {
            CloseableUtils.closeQuietly(client) ;
            client = null ;
        }
    }
    
    public static CuratorClient init(String address) {
        CuratorClient curatorClient = new CuratorClient(address) ;
        cache.compareAndSet(null, curatorClient.getClient()) ;
        return curatorClient ;
    }
    
}
